#! /usr/bin/env python

"""This _hdfs module interfaces to libhdfs using SWIG compiled files

It can be used to connect, disconnect, move, copy, etc from a given 
HDFS. 

"""

from threading import Lock
import queue
import io
import logging
import logging.config
import ma.commons.core.constants as constants
import ma.commons.core.filetransferinfo as filetransferinfo
import ma.const
import ma.log
import os
import os.path
import subprocess
import sys

#import pyhdfs



class HDFS(object):
    """This object is used to interact with the Hadoop Distributed File System
    
    This object can be used for basic interaction with HDFS including 
    connecting, disconnecting, copying from local filesystem (and vice versa),
    etc. 
    
    TODO: throw exceptions to the __log
    
    """
    
    def __init__(self, host="default", port=0):
        """Constructor for HDFS class
        
        Takes a host and a port. If not defined will be picked from
        the siteconf.xml
        
        """
        #initialize __log for HDFS
        self.__log = ma.log.get_logger("ma.fs")
        self.hdfs = None
        
        # for keeping stats of files transferred to DFS
        # for keeping stats of files transferred from DFS
        self.__stats_to_local_lock = Lock()
        self.__file_stats_to_local_queue = queue.Queue()
        self.__file_transfer_to_local_id = constants.DFS_TRANSFER_ID_DEFAULT
        
        # lock used to update the number of bytes sent
        self.__stats_to_dfs_lock = Lock()
        self.__file_stats_to_dfs_queue = queue.Queue()
        self.__file_transfer_to_dfs_id = constants.DFS_TRANSFER_ID_DEFAULT
        
        self.__hdfs_copy_retries = ma.const.XmlData.get_int_data(ma.const.xml_dfs_copy_retries)
        
        try:
            #self.connect(host, port)
            
            # constants
            self.__hadoop_exec_path = ma.const.XmlData.get_dfs_filepath_str_data(ma.const.xml_hadoop_exec)
            self.__EXEC_DFS_PARAMETER = 'fs'
            
            # stats variables
            self.__copied_to_local_bytes = 0
            self.__copied_to_dfs_bytes = 0
            
            self.__log.info('-- Initialized HDFS Connector --')
        except Exception as err_msg:
            self.__log.exception("Error while connecting to HDFS: %s", err_msg)

    """
    def connect(self, host, port):
        try:
            # check if the object is already connected to an HDFS
            if self.hdfs != None:
                # disconnect from the current framework before connecting
                self.disconnect()
            
            # attempt to connect
            self.hdfs = pyhdfs.hdfsConnect(host, port)
            if self.hdfs == None:
                self.__log.error("Error while connecting to HDFS")
        except:
            self.__log.exception("Error while connecting to HDFS")
    
    def disconnect(self):
        try:
            # run only if connected
            if self.hdfs != None:
                ret = pyhdfs.hdfsDisconnect(self.hdfs)
                self.hdfs = None
                if ret == -1:
                    raise StandardError("Error not disconnecting")
            else:
                raise UserWarning("Not Connected")
        except Exception:
            print sys.exc_info()
        
    def copy(self, source, target, t_hadoopDFS=None):
        if t_hadoopDFS == None:
            t_hadoopDFS = self
        if not isinstance(t_hadoopDFS, HDFS):
            raise TypeError("Target DFS must be a HadoopDFS")
        a = pyhdfs.hdfsCopy(self.hdfs, source, t_hadoopDFS.hdfs, target)
        if  a == -1:
            raise IOError("Error copying")
        
    def move(self, source, target, t_hadoopDFS=None):
        if t_hadoopDFS == None:
            t_hadoopDFS = self
        if not isinstance(t_hadoopDFS, HDFS):
            raise TypeError("Target DFS must be a HadoopDFS")
        a = pyhdfs.hdfsMove(self.hdfs, source, t_hadoopDFS.hdfs, target)
        if  a == -1:
            raise IOError("Error moving")
    """
    
    def check_path(self, hdfs_path):
        """This function returns 0 if the path is directory, 1 if
        it is a file, and -1 if the path does not exist.
        
        Refer to http://hadoop.apache.org/common/docs/current/hdfs_shell.html#test
        """
        TEST_ARG = '-test'
        try:
            # check if directory
            cmd = [ self.__hadoop_exec_path, self.__EXEC_DFS_PARAMETER, TEST_ARG, '-d', hdfs_path ]
            self.__log.debug('Executing HDFS command: %s', str(cmd))
            ret = subprocess.call(cmd)
            
            self.__log.info('Checked an HDFS Filepath: %s', hdfs_path)
            
            # if directory
            if ret == 0:
                return 0
            # if file
            elif ret == 1:
                return 1
            # if does not exist
            elif ret == 255:
                return -1
            else:
                raise Exception('HDFS Error')
            
        except Exception, err_msg:
            self.__log.exception("Error while checking an HDFS path: %s", err_msg)
            raise IOError("Error checking HDFS path")
    
    
    def delete_file(self, hdfs_filepath, run_pre_checks=False):
        """This function first checks if the given path is file,
        and then deletes it. Returns true on success
        """
        DELETE_ARG = '-rm'
        try:
            #ret = pyhdfs.hdfsDelete(self.hdfs, filename)
            
            if run_pre_checks:
                # check if file
                ret = self.check_path(hdfs_filepath)
                if ret != 1:
                    self.__log.warning('The path given is not an existing file: %s', hdfs_filepath)
                    return False
            
            # delete the file
            cmd = [ self.__hadoop_exec_path, self.__EXEC_DFS_PARAMETER, DELETE_ARG, hdfs_filepath ]
            self.__log.debug('Executing HDFS command: %s', str(cmd))
            ret = subprocess.call(cmd)
            
            #print ret
            
            if ret != 0:
                raise Exception('HDFS Error')
            
            self.__log.info('Deleted HDFS File: %s', hdfs_filepath)
            
            return True
        except Exception as err_msg:
            self.__log.exception("Error while deleting an HDFS file: %s", err_msg)
            raise IOError("Error deleting HDFS file")
        
    
    def delete_dir(self, hdfs_path, run_pre_checks=False):
        """This function first checks if the given path is directory,
        and then deletes it including its contents. Returns true on 
        success
        """
        DELETE_R_ARG = '-rmr'
        try:
            #ret = pyhdfs.hdfsDelete(self.hdfs, filename)
            
            if run_pre_checks:
                # check if directory
                ret = self.check_path(hdfs_path) 
                if ret != 0:
                    self.__log.warning('The path given is not an existing directory: %s', hdfs_path)
                    return False
                
            # delete the path
            cmd = [ self.__hadoop_exec_path, self.__EXEC_DFS_PARAMETER, DELETE_R_ARG, hdfs_path ]
            self.__log.debug('Executing HDFS command: %s', str(cmd))
            ret = subprocess.call(cmd)
            
            #print ret
            
            if ret != 0:
                raise Exception('HDFS Error')
            
            self.__log.info('Deleted HDFS Directory: %s', hdfs_path)
            
            return True
        except Exception as err_msg:
            self.__log.exception("Error while deleting an HDFS directory: %s", err_msg)
            raise IOError("Error deleting HDFS directory")
    
    """
    def rename(self, old, new):
        a = pyhdfs.hdfsRename(self.hdfs, old, new)
        if  a == -1:
            raise IOError("Error renaming")
    """
      
    def create_dir(self, hdfs_path, run_pre_checks=False):
        """This function creates a directory recursively i.e. any non
        existing parents in its path will also be created (mkdir -p!)
        """
        CREATE_R_ARG = '-mkdir'
        try:
            #ret = pyhdfs.hdfsCreateDirectory(self.hdfs, name)
            
            if run_pre_checks:
                # check if directory
                ret = self.check_path(hdfs_path)
                if ret == 0:
                    self.__log.warning('The path given is already an existing directory: %s', hdfs_path)
                    return False
                
            # delete the path
            cmd = [ self.__hadoop_exec_path, self.__EXEC_DFS_PARAMETER, CREATE_R_ARG, hdfs_path ]
            self.__log.debug('Executing HDFS command: %s', str(cmd))
            ret = subprocess.call(cmd)
            
            #print ret
            
            if ret != 0:
                raise Exception('HDFS Error')
            
            self.__log.info('Created HDFS Directory: %s', hdfs_path)
            
            return True
        except Exception as err_msg:
            self.__log.exception("Error while creating an HDFS directory: %s", err_msg)
            raise IOError("Error creating an HDFS directory")
    
        
    
    """    
    def setWorkingDirectory(self, name):
        a = pyhdfs.hdfsSetWorkingDirectory(self.hdfs, name)
        if  a == -1:
            raise IOError("Error setting working directory")
        
    def getWorkingDirectory(self, length=255):
        a = pyhdfs.hdfsGetWorkingDirectory(self.hdfs, length)
        if a == '':
            raise IOError("Error getting working directory")
        return a
    
    def getCapacity(self):
        a = pyhdfs.hdfsGetCapacity(self.hdfs)
        if a == -1:
            raise IOError("Error getting capacity")
        return a
    
    def getUsed(self):
        a = pyhdfs.hdfsGetUsed(self.hdfs)
        if  a == -1:
            raise IOError("Error getting used")
        return a
    
    def pathExists(self, name):
        a = pyhdfs.hdfsExists(self.hdfs, name)
        if a == -1:
            raise IOError("Existential error")
        return True
    """
    
    def copy_file_to_local(self, hdfs_filepath, dest_dir, job_id_log=-1, run_pre_checks=False, auto_retry=False):
        """This function first checks if the given path is a file on
        the DFS, and then checks if the dest_dir is an existing
        directory. If all this is true, it will copy the file from the
        DFS to the local FS. If the file already exists in the dest dir, 
        it is over-written. Returns true on success
        """
        
        COPY_TO_LOCAL_ARG = '-copyToLocal'
        
        is_copy_complete = False
        retries_available = self.__hdfs_copy_retries
        
        while is_copy_complete == False and retries_available > 0: 
            try:
                #ret = pyhdfs.hdfsDelete(self.hdfs, filename)
                filename = os.path.basename(hdfs_filepath)
                # dest_filepath = dest_dir + os.sep + filename
                dest_filepath = os.path.join(dest_dir, filename)
                
                if run_pre_checks:
                    # check if file
                    ret = self.check_path(hdfs_filepath)
                    if ret != 1:
                        self.__log.warning('The DFS filepath given is not a valid file: %s', dest_dir)
                        return False
            
                    # check if the path exists
                    if not os.path.isdir(dest_dir):
                        self.__log.warning('The local path given is not an existing directory: %s', dest_dir)
                        return False
                    
                    # delete if file already exists in the destination dir
                    if os.path.isfile(dest_filepath):
                        self.__log.info('Deleting old local file: %s', dest_filepath)
                        os.remove(dest_filepath, False)
                
                # create file stats object
                with self.__stats_to_local_lock:
                    transfer_info = filetransferinfo.FileTransferInfo(self.__file_transfer_to_local_id, dest_filepath, 0, job_id_log)
                    # adjust the id for next transfer
                    self.__file_transfer_to_local_id = filetransferinfo.get_next_id(self.__file_transfer_to_local_id)
                
                # copy hdfs file to local dir
                cmd = [ self.__hadoop_exec_path, self.__EXEC_DFS_PARAMETER, COPY_TO_LOCAL_ARG, hdfs_filepath, dest_dir ]
                self.__log.debug('Executing HDFS command: %s', str(cmd))
                transfer_info.note_transfer_start()
                ret = subprocess.call(cmd)
                transfer_info.note_transfer_end()
                
                #print ret
                
                if ret != 0:
                    raise Exception('HDFS Error')
                
                # update copied to local bytes stat
                size = os.path.getsize(dest_filepath)
                transfer_info.file_size = size
                
                # add file bytes to total bytes transferred
                with self.__stats_to_local_lock:
                    self.__copied_to_local_bytes += size
                
                # insert file transfer stats into a common queue
                self.__file_stats_to_local_queue.put(transfer_info)
                
                self.__log.info('Copied HDFS file %s to a local path %s', hdfs_filepath, dest_dir)
                
                is_copy_complete = True
                retries_available = 0
                
                return True
            except Exception as err_msg:
                self.__log.exception("Error while copying an HDFS file: %s", err_msg)
                
                retries_available -= 1
                
                if auto_retry == False or retries_available <= 0:
                    is_copy_complete = True
                    raise IOError("Error copying file from HDFS")
                
                self.__log.debug("Copy retries remaining %d", retries_available)
    
    
    def copy_file_from_local(self, local_filepath, hdfs_dest_path, job_id_log=-1, run_pre_checks=False):
        """This function first checks if the given path is a file is an 
        existing valid file, and then checks if the DFS destination path
        is an existing directory. If all this is true, it will copy the 
        file from the local FS to the DFS. If the file already exists in
        the destination DFS directory, it is over-written. Returns true
        on success
        """
        COPY_FROM_LOCAL_ARG = '-copyFromLocal'
        try:
            #ret = pyhdfs.hdfsDelete(self.hdfs, filename)

            # strip the directory separator if there
            hdfs_dest_path = hdfs_dest_path.rstrip(ma.const.dfs_dir_sep)
            
            filename = os.path.basename(local_filepath)
            dest_filepath = hdfs_dest_path + ma.const.dfs_dir_sep + filename
            
            if run_pre_checks:
                # check if the file exists on this node
                if not os.path.isfile(local_filepath):
                    self.__log.warning('The filepath does not exist on the local FS: %s', local_filepath)
                    return False
                
                # check if file
                ret = self.check_path(hdfs_dest_path)
                if ret != 0:
                    self.__log.warning('The destination DFS path provided does not exist: %s', hdfs_dest_path)
                    return False
                    
                # delete if file already exists in the destination DFS dir
                ret = self.check_path(dest_filepath)
                if ret == 1:
                    self.__log.info('Deleting old DFS file: %s', dest_filepath)
                    self.delete_file(dest_filepath)
            
            size = os.path.getsize(local_filepath)
            
            # create file stats object
            with self.__stats_to_dfs_lock:
                transfer_info = filetransferinfo.FileTransferInfo(self.__file_transfer_to_dfs_id, dest_filepath, size, job_id_log)
                # adjust the id for next transfer
                self.__file_transfer_to_dfs_id = filetransferinfo.get_next_id(self.__file_transfer_to_dfs_id)
            
            # copy hdfs file to local dir
            cmd = [ self.__hadoop_exec_path, self.__EXEC_DFS_PARAMETER, COPY_FROM_LOCAL_ARG, local_filepath, hdfs_dest_path ]
            self.__log.debug('Executing HDFS command: %s', str(cmd))
            transfer_info.note_transfer_start()
            ret = subprocess.call(cmd)
            transfer_info.note_transfer_end()
            
            #print ret
            
            if ret != 0:
                raise Exception('HDFS Error')
            
            # add file bytes to total bytes transferred
            with self.__stats_to_dfs_lock:
                self.__copied_to_dfs_bytes += size

            # insert file transfer stats into a common queue
            self.__file_stats_to_dfs_queue.put(transfer_info)
            
            self.__log.info('Copied file %s to DFS path %s', local_filepath, hdfs_dest_path)
            
            return True
        except Exception as err_msg:
            self.__log.exception("Error while copying file to HDFS: %s", err_msg)
            raise IOError("Error copying file to HDFS")


    def bytes_copied_to_local(self):
        """returns the number of bytes copied to the DFS. It can be used to 
        get the number of bytes cumulatively written for output files 
        """
        return self.__copied_to_dfs_bytes

    
    def bytes_copied_to_dfs(self):
        """returns the number of bytes copied to the DFS. It can be used to 
        get the number of bytes cumulatively written for output files 
        """
        return self.__copied_to_dfs_bytes


    def flush_file_transfer_to_dfs_infos(self, job_id=None):
        """Returns all the FileTransferInfo items in the internal queue for
        files transferred from the Local FS to the DFS. It also empties the 
        queue. If the job_id is given it will only give the FileTransferInfo 
        objs for that particular job
        """
        
        return_list = []
        # get the current size
        sz = self.__file_stats_to_dfs_queue.qsize()
        idx = 0
        
        # iterate the queue till the estimated size or till empty 
        while idx < sz and not self.__file_stats_to_dfs_queue.empty():
            filetxinfo = self.__file_stats_to_dfs_queue.get()
            if job_id != None and job_id != filetxinfo.job_id:
                # push back in queue again if the job_id doesnt match
                self.__file_stats_to_dfs_queue.put(filetxinfo)
            else:
                # if job_id == None then push all, or job_id matches
                return_list.append(filetxinfo)
                
            # increment size iterated
            idx += 1
        
        return return_list
    

    def flush_file_transfer_to_local_infos(self, job_id=None):
        """Returns all the FileTransferInfo items in the internal queue for
        files transferred from the DFS to the Local FS. It also empties the 
        queue. If the job_id is given it will only give the FileTransferInfo 
        objs for that particular job
        """
        
        return_list = []
        # get the current size
        sz = self.__file_stats_to_local_queue.qsize()
        idx = 0
        
        # iterate the queue till the estimated size or till empty 
        while idx < sz and not self.__file_stats_to_local_queue.empty():
            filetxinfo = self.__file_stats_to_local_queue.get()
            if job_id != None and job_id != filetxinfo.job_id:
                # push back in queue again if the job_id doesnt match
                self.__file_stats_to_local_queue.put(filetxinfo)
            else:
                # if job_id == None then push all, or job_id matches
                return_list.append(filetxinfo)
                
            # increment size iterated
            idx += 1
        
        return return_list
    
        
if __name__ == '__main__':
    print("Run HDFS tests")